package com.isyscore.os.metadata.kettle.step;

import com.isyscore.os.metadata.enums.KettleDataFlowNodeType;
import com.isyscore.os.metadata.enums.KettleDataSourceType;
import com.isyscore.os.metadata.kettle.base.FlowConfig;
import com.isyscore.os.metadata.kettle.base.Step;
import com.isyscore.os.metadata.kettle.vis.FieldTuple;
import com.isyscore.os.metadata.kettle.vis.TableOutputNode;
import com.isyscore.os.metadata.kettle.vis.VisNode;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.plugins.PluginInterface;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;


public class TableOutputStep implements Step {
    @Override
    public StepMeta decode(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        TableOutputNode node = (TableOutputNode) step;

        PluginRegistry registry = PluginRegistry.getInstance();
        PluginInterface sp = registry.findPluginWithId(StepPluginType.class, KettleDataFlowNodeType.TableOutput.name());
        StepMetaInterface stepMetaInterface = (StepMetaInterface) registry.loadClass(sp);

        TableOutputMeta tableOutputMeta = (TableOutputMeta) stepMetaInterface;

        DatabaseMeta database = DatabaseMeta.findDatabase(transMeta.getDatabases(), node.getDataSourceId() + "-" + node.getDatabaseName());
        if (database != null) {

            tableOutputMeta.setDatabaseMeta(database);
            if (!database.getPluginId().equals("MYSQL")) {
                tableOutputMeta.setSchemaName(step.getDatabaseName().contains(".") ? step.getDatabaseName().split("\\.")[1] : step.getDatabaseName());
            }
        }
        //用于判断是否需要进行任务失败重试
        if(transGraph.getOtherParam()!=null){
            Integer retryRun = Integer.valueOf(transGraph.getOtherParam().getOrDefault("retryRun", "0"));
            tableOutputMeta.setRetryRun(retryRun == 1 ? true : false);
            //设置definitionId,在任务重试的时候用于区分不同的任务临时表
            String definitionId = transGraph.getOtherParam().get("definitionId");
            tableOutputMeta.setDefinitionId(definitionId);
        }

        tableOutputMeta.setDbType(String.valueOf(KettleDataSourceType.getType(database.getPluginId()).getCode()));

        tableOutputMeta.setTableName(node.getTableName());
        tableOutputMeta.setTruncateTable(node.getTruncate());//是否裁剪表
        tableOutputMeta.setSpecifyFields(true);//指定具体字段
        List<FieldTuple> fieldMapping = node.getFieldMapping();
        List<String> columnNames = new ArrayList<>();
        List<String> streamNames = new ArrayList<>();
        for (FieldTuple fieldTuple : fieldMapping) {
            columnNames.add(fieldTuple.getToField());
            streamNames.add(fieldTuple.getFromField());
        }

        tableOutputMeta.setFieldDatabase(columnNames.toArray(new String[columnNames.size()]));
        tableOutputMeta.setFieldStream(streamNames.toArray(new String[streamNames.size()]));

        //指定数据提交批次大小
        tableOutputMeta.setCommitSize(node.getBatchSize() == null ? 1000 : node.getBatchSize());

        StepMeta stepMeta = new StepMeta(KettleDataFlowNodeType.TableOutput.name(), step.getNodeName(), stepMetaInterface);

//        stepMeta.setCopiesString("3");
        stepMeta.setCopiesString(String.valueOf(node.getCopies() == null ? 1 : node.getCopies()));


        List<String> preNodes = transGraph.getTransHups().stream().filter(hup -> hup.getTo().equals(step.getNodeName())).map(hup -> hup.getFrom()).collect(Collectors.toList());

        for (StepMeta transMetaStep : transMeta.getSteps()) {
            if (preNodes.contains(transMetaStep.getName()) && transMetaStep.getTypeId().equals("SortRows")) {
                //指定指定并发度 排序输出并行度必须为1否则数据会混乱
                stepMeta.setCopiesString("1");
                break;
            }
        }

        stepMeta.setDraw(true);
        return stepMeta;
    }

    @Override
    public StepMeta after(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        return null;
    }
}
